package com.k2de.consumer.impl;

import com.k2data.common.config.InitConfig;
import com.k2data.platform.ddm.k2db.realtime.data.common.util.MethodTimeRecorderUtil;
import com.k2de.consumer.config.InitConfigForConsumer;
import com.k2de.consumer.inter.MessageConsumerInter;
import com.k2de.consumer.inter.MessageToRecordConverterInter;
import com.k2de.consumer.result.KmxRecordResult;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * kafka消费者.
 *
 * Created by chenjingshuai on 18-1-9.
 */
public abstract class KafkaConsumer implements MessageConsumerInter {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    // Kafka消费者.
    private ConsumerConnector consumer;
    /**
     * {@linkplain #consumer}所消费信息key的前缀; 用于消费指定key的Message; 为null代表消费所有的Message.
     */
    private String consumedKafkaKeyPrefix;

    /**
     * 由子类指定.
     */
    private MessageToRecordConverterInter messageToRecordConverterInter;

    public KafkaConsumer(String zkUrl) {
        this(zkUrl, null, InitConfigForConsumer.CONSUMER_GROUP_NAME,
                InitConfigForConsumer.OFFSET_WHEN_OFFSET_INFO_IS_ABSENT_IN_ZK);
    }

    public KafkaConsumer(String zkUrl, String consumedKafkaKeyPrefix, String groupName,
                         String offsetWhenOffsetInfoIsAbsentInZk) {
        this(zkUrl, consumedKafkaKeyPrefix, groupName, offsetWhenOffsetInfoIsAbsentInZk, true);
    }

    public KafkaConsumer(String zkUrl, String consumedKafkaKeyPrefix, String groupName,
                         String offsetWhenOffsetInfoIsAbsentInZk, boolean isOffsetAutoCommit) {
        // create consumer
        Properties props = new Properties();
        props.put("zookeeper.connect", zkUrl);
        props.put("group.id", groupName);
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.commit.enable", Boolean.valueOf(isOffsetAutoCommit).toString());
        props.put("auto.offset.reset", offsetWhenOffsetInfoIsAbsentInZk);
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        ConsumerConfig config = new ConsumerConfig(props);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);

        this.consumedKafkaKeyPrefix = consumedKafkaKeyPrefix;
    }

    @Override
    public final void setMessageToRecordConverter(MessageToRecordConverterInter converter) {
        if (InitConfig.WHETHER_TO_RECORD_METHOD_TIME) {
            this.messageToRecordConverterInter = MethodTimeRecorderUtil.getProxyThatRecordsMethodTime(
                    converter, MessageToRecordConverterInter.class);
        } else {
            this.messageToRecordConverterInter = converter;
        }
    }

    @Override
    public final void consume(String topic) {
        //topic
        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topic, 1);
        //decoder string
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
        //get consumer map
        Map<String, List<KafkaStream<String, String>>> consumerMap =
                consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<String, String> it = stream.iterator();

        //get message
        while (it.hasNext()) {
            MessageAndMetadata<String, String> message = it.next();
            String key;
            String msg;
            try {
                key = message.key();
                msg = message.message();
                if (msg == null) {
                    LOGGER.debug("Receive null message...");
                }
            } catch (Exception e) {
                LOGGER.warn("Receive msg fails: {}.", e.getMessage());
                continue;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Consume message success, key is {}, message is {},", key, message);
            }
            if (consumedKafkaKeyPrefix == null || key.startsWith(consumedKafkaKeyPrefix)) {
                KmxRecordResult kmxRecordResult = messageToRecordConverterInter.convertMessageToRecord(msg);

                sendKmxRecordResult(kmxRecordResult);
            }
        }

    }

    /**
     * 当需要手动提交offset的时候(即kafka的'auto.commit.enable'配置项的值为'false'), 调用此方法.
     */
    protected void commitOffsets() {
        consumer.commitOffsets();
    }
}
